ectd在V3之後, 都透過rRPC+Protobuf來跟etcd Server做溝通調用.
所以在etcd/etcdserver/etcdserverpb/rpc.proto
底下定義了6類gRPC service
etcd clientV3則是對這些這些服務又定義了Interface並且實做了具體類別.
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
Do(ctx context.Context, op Op) (OpResponse, error)
Txn(ctx context.Context) Txn
}
// Client provides and manages an etcd v3 client session.
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
conn *grpc.ClientConn
cfg Config
creds grpccredentials.TransportCredentials
resolverGroup *endpoint.ResolverGroup
mu *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
authTokenBundle credentials.Bundle
callOpts []grpc.CallOption
lg *zap.Logger
}
可以看到Client結構裡面包含了gRPC的conn, 該conn還實現了gRPC RoundRobin的負載均衡策略.
利用第12天的Docker來連線
先建立Config物件
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
cli, err := clientv3.New(clientv3.Config{
Context: cancelCtx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
這裡只有簡單的把etcd節點(多個節點用,
併成字串陣列)跟建立grpc連線的timeout,如果超時都沒連線成功就返回error.
還有context,這能用來取消grpc的dial-out連線.
也能設定Username
,Password
,如果etcd的Auth有配置帳密的話.
或者透過TLS給憑證.
透過clientv3.New取得Client實例.
它會呼叫一個私有方法newClient()
// New creates a new etcdv3 client from a given configuration.
func New(cfg Config) (*Client, error) {
if len(cfg.Endpoints) == 0 {
return nil, ErrNoAvailableEndpoints
}
return newClient(&cfg)
}
func newClient(cfg *Config) (*Client, error) {
// 略
conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
if err != nil {
client.cancel()
client.resolverGroup.Close()
return nil, err
}
// TODO: With the old grpc balancer interface, we waited until the dial timeout
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
client.conn = conn
client.Cluster = NewCluster(client)
client.KV = NewKV(client)
client.Lease = NewLease(client)
client.Watcher = NewWatcher(client)
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)
// 略
return client, nil
}
可以看到就是這段實做了負載均衡.
然後NewKV就是建立了KV gRPC Client的實例.
其他gRPC Client也是以一樣的方式建立, 並組合進去client物件後返回.
只要初始化連線上了, 之後就不需要關心重連問題, client內部會自動重連.
對etcdClient物件, 呼叫NewKV, 就會得到一個KV service的實現;
這裡是獲得一個會處理錯誤retry的KV對象.
func NewKV(c *Client) KV {
api := &kv{remote: RetryKVClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
error retry主要的代碼在此
概念就是對grpc的Unary或Stream配置Interceptor攔截器.
來玩看看CRUD, 順便看看第11天介紹的Revision
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"time"
"go.etcd.io/etcd/clientv3"
)
var (
dialTimeout = 10 * time.Second
requestTimeout = 3 * time.Second
)
func main() {
cancelCtx, canFunc := context.WithCancel(context.Background())
timeOutCtx, _ := context.WithTimeout(context.Background(), requestTimeout)
var endpoints = []string{"172.16.238.100:2379", "172.16.238.101:2379", "172.16.238.101:2379"}
cli, err := clientv3.New(clientv3.Config{
Context: cancelCtx,
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatalln(err)
}
defer cli.Close()
kv := clientv3.NewKV(cli)
singleKeyWithRevision(timeOutCtx, kv)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
canFunc()
fmt.Println("Service Terminate")
}
func singleKeyWithRevision(ctx context.Context, kv clientv3.KV) {
fmt.Println("SingleKeyWithRevision()")
key := "demo"
// 刪除之前遺留的Key
kv.Delete(ctx, key, clientv3.WithPrefix())
// 新增一組KV {"Demo": "444"}
pr, _ := kv.Put(ctx, key, time.Now().UTC().String())
rev := pr.Header.Revision
fmt.Println("Revision:", rev)
gr, _ := kv.Get(ctx, key)
fmt.Println("Value: ", string(gr.Kvs[0].Value), "Revision: ", gr.Header.Revision)
// 透過Put修改現有的Key, 會建立新的revision
kv.Put(ctx, key, time.Now().UTC().String())
gr, _ = kv.Get(ctx, key)
fmt.Println("Value: ", string(gr.Kvs[0].Value), "Revision: ", gr.Header.Revision)
// 取得上一版的revision
gr, _ = kv.Get(ctx, key, clientv3.WithRev(rev))
fmt.Println("Value: ", string(gr.Kvs[0].Value), "Revision: ", gr.Header.Revision)
}
/*
SingleKeyWithRevision()
Revision: 1954477
Value: 2020-09-06 15:47:16.398487325 +0000 UTC Revision: 1954477
Value: 2020-09-06 15:47:16.400697144 +0000 UTC Revision: 1954478
Value: 2020-09-06 15:47:16.398487325 +0000 UTC Revision: 1954478
*/
從上面結果能看出, 每次修改都會保存之前的revision與資料.